[core] Introduce BucketSelector based on partition values to achieve bucket level predicate push down#7486
Conversation
…bucket level predicate push down
686a764 to
858c506
Compare
There was a problem hiding this comment.
Pull request overview
Introduces a partition-aware BucketSelector to enable bucket-level predicate pushdown for compound predicates by evaluating partition predicates against concrete partition values during scan planning.
Changes:
- Add
BucketSelector+PartitionValuePredicateVisitorand wire full-predicate propagation viawithCompleteFilterto enable partition-aware bucket pruning. - Update bucket filtering plumbing to use
TriFilter<BinaryRow, Integer, Integer>(partition, bucket, totalBucket) end-to-end. - Add new unit/integration tests covering compound predicate bucket pruning (single-field and composite bucket keys).
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java | New partition-aware bucket selector that derives candidate buckets from predicates. |
| paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java | Refactored to return a BucketSelector (TriFilter) when eligible. |
| paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java | Passes the full predicate to scans via withCompleteFilter. |
| paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java | Adds withCompleteFilter and switches total-aware bucket filter to TriFilter. |
| paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java | Implements withCompleteFilter to install bucket-level pruning. |
| paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java | Implements withCompleteFilter to install bucket-level pruning. |
| paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java | Threads partition value into bucket filtering during manifest entry filtering. |
| paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java | Bucket filter now tests with (partition, bucket, totalBucket) via TriFilter. |
| paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java | Applies bucket filtering with partition context when scanning cached segments. |
| paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java | Constructs the new BucketSelectConverter instance. |
| paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java | Constructs the new BucketSelectConverter instance. |
| paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java | New 3-arg filter functional interface used for bucket pruning. |
| paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java | New visitor that evaluates partition-only leaf predicates against a concrete partition row. |
| paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java | Uses PredicateBuilder.and/or to simplify rebuilt compound predicates. |
| paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java | New integration test validating bucket pruning under compound predicates (single/composite keys). |
| paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java | New unit tests for bucket selection behavior across predicate patterns and partitioned tables. |
| paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java | New unit tests validating predicate rewriting for partition values. |
| paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java | Removed (replaced by BucketSelectorTest). |
| paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java | Removed (behavior now exercised via updated scan/bucket selector tests). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int rowCount = 1; | ||
| for (List<Object> values : bucketValues) { | ||
| if (values == null) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| rowCount *= values.size(); | ||
| if (rowCount > MAX_VALUES) { | ||
| return Optional.empty(); | ||
| } |
There was a problem hiding this comment.
In BucketSelector, rowCount *= values.size() uses int multiplication and can overflow into a negative value (e.g., two large IN-lists on a composite bucket key). That can bypass the rowCount > MAX_VALUES guard and then assembleRows may attempt to enumerate an enormous Cartesian product, risking hangs/OOM. Consider switching rowCount to long (or using checked multiplication) and short-circuiting as soon as the product exceeds MAX_VALUES (ideally also cap/dedup per-column value collection early to avoid holding huge literal lists).
| int rowCount = 1; | |
| for (List<Object> values : bucketValues) { | |
| if (values == null) { | |
| return Optional.empty(); | |
| } | |
| rowCount *= values.size(); | |
| if (rowCount > MAX_VALUES) { | |
| return Optional.empty(); | |
| } | |
| long rowCount = 1L; | |
| for (List<Object> values : bucketValues) { | |
| if (values == null) { | |
| return Optional.empty(); | |
| } | |
| int size = values.size(); | |
| if (size <= 0) { | |
| // No possible values for this column, no matching rows | |
| return Optional.empty(); | |
| } | |
| // Check for overflow and limit total number of combinations | |
| if (rowCount > (long) MAX_VALUES / size) { | |
| return Optional.empty(); | |
| } | |
| rowCount *= size; |
| builder.option(BUCKET_KEY.key(), "b"); | ||
| } | ||
| Schema schema = builder.build(); | ||
|
|
||
| Identifier tableId = identifier("test_bucket_filter"); | ||
| catalog.createTable(tableId, schema, false); | ||
| Table table = catalog.getTable(tableId); | ||
|
|
||
| // ---- write data: 5 partitions × 20 b-values = 100 rows ---- | ||
| GenericRow[] rows = new GenericRow[100]; | ||
| int idx = 0; |
There was a problem hiding this comment.
These assertions hard-code specific bucket IDs (e.g., "3,1", "1,6"). That makes the test brittle to changes in the bucket hash implementation / BucketFunctionType defaults, even if bucket-level pruning is still correct. Consider computing expected bucket IDs using the same BucketFunction as production (and asserting on those), so the test validates pruning behavior without depending on a particular hash result.
|
|
||
| package org.apache.paimon.utils; | ||
|
|
||
| /** Represents a filter (boolean-valued function) of three argument. */ |
There was a problem hiding this comment.
Typo/grammar in the Javadoc: "filter ... of three argument" should be "... of three arguments".
| /** Represents a filter (boolean-valued function) of three argument. */ | |
| /** Represents a filter (boolean-valued function) of three arguments. */ |
| } | ||
| Schema schema = builder.build(); | ||
|
|
||
| Identifier tableId = identifier("test_composite_bucket_filter"); | ||
| catalog.createTable(tableId, schema, false); | ||
| Table table = catalog.getTable(tableId); | ||
|
|
||
| // ---- write data: 5 partitions × 20 b-values x 10 c-values = 1000 rows ---- | ||
| GenericRow[] rows = new GenericRow[1000]; | ||
| int idx = 0; | ||
| for (int a = 1; a <= 5; a++) { | ||
| for (int b = 1; b <= 20; b++) { | ||
| for (int c = 0; c < 10; c++) { |
There was a problem hiding this comment.
Same brittleness here: the expected results are hard-coded bucket IDs for composite keys (e.g., "3,9", "3,0"...). To keep the test stable across bucket hash / BucketFunction changes, consider deriving expected bucket IDs via BucketFunction from the (b,c) literals instead of asserting specific numeric buckets.
|
+1 |
Purpose
Introducing BucketSelector based on partition values to achieve bucket level predicate push down optimization.
Case 1: bucket filtering with compound predicates on a single-field bucket key.
Table schema:
Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) = 100 rows.
Scenarios:
Case2: bucket filtering with compound predicates on a composite (multi-field) bucket key.
Table schema:
Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) × 10 c-values (c=0 to 9) = 1000 rows.
Test scenarios:
Tests
API and Format
Documentation
Generative AI tooling